Task : Multi-Task management
Task module provides multi-tasking environment support for JSRE.
Each Task is an independent thread in the operating system. The operating system can be independently scheduled according to the scheduling policy. The multitasking design can improve the application parallelism.
Why Multi-Task
Although the runtime platform such as Node.js is parallelized by a multi-threaded asynchronous proxy, the core program process can't be parallelized, and the programmer's controllability is too low and there is no controllable policy. JSRE is different. JSRE hopes that programmers have the opportunity to control the parallel design and control the real-time task scheduling policy.
Multi-tasking design also provides better application code decoupling, application module separation and development, and easier to build more complex large-scale applications.
Easy to develop
Multitasking also causes some development difficulties, including shared resource safety, synchronization and mutual exclusion. In order to reduce the development difficulty, JSRE allocates language environment context for each task, and the functions and objects in the context are not shared between multitasking. This design can greatly reduce the development difficulty.
In order to solve the efficiency reduce caused by data non-sharing, JSRE provides a feature-rich and efficient inter-task communication mechanism, including shared memory (Shared.ArrayBuffer
), subscription and publish asynchronous communication (SigSlot
), multi-task shared K/V map (SyncTable
), local procedure call (LPC
) and basic communication interfaces (task.send
). These interfaces not only can transmit strings but also can transmit object.
The multitasking environment provided by JSRE is not only efficient, but also highly decoupled and easy to develop.
Example
- main.js
#!/bin/javascript
var task = new Task('./task.js');
var msg = { a: 3, b: 5 };
console.log('new task id:', task.id());
while (true) {
task.send(msg);
sys.sleep(1000);
}
- task.js
console.log('new task run!, id:', Task.me());
Task.async(false);
while (true) {
var from = {};
var msg = Task.recv(from);
if (msg) {
console.log('recv a message:', msg, 'from:', from);
}
}
Or:
console.log('new task run!, id:', Task.me());
var iosched = require('iosched');
Task.on('message', function(msg, from) {
console.log('recv a message:', msg, 'from:', from);
});
iosched.forever();
Can be run as follows:
$ ./main.js
[JSRE-USR]new task id: 67174472
[JSRE-USR]new task run!, id: 67174472
[JSRE-USR]recv a message: {a:3,b:5} from: {id:67174470}
[JSRE-USR]recv a message: {a:3,b:5} from: {id:67174470}
[JSRE-USR]recv a message: {a:3,b:5} from: {id:67174470}
...
Support
The following shows Task
module APIs available for each permissions.
User Mode | Privilege Mode | |
---|---|---|
Task | ● | ● |
Task.me | ● | ● |
Task.exit | ● | ● |
Task.parent | ● | ● |
Task.create | ● | ● |
Task.count | ● | ● |
Task.list | ● | ● |
Task.yield | ● | ● |
Task.testCancel | ● | ● |
Task.priority | ● | ● |
Task.send | ● | ● |
Task.recv | ● | ● |
Task.flush | ● | ● |
Task.isMain | ● | ● |
Task.getCurrentSize | ● | ● |
Task.getBufferSize | ● | ● |
Task.setBufferSize | ● | ● |
Task.nextTick | ● | ● |
Task.cleanup | ● | ● |
Task.fd | ● | ● |
Task.async | ● | ● |
Task.tls | ● | ● |
Task.wtls | ● | ● |
Task.features | ● | ● |
task.id | ● | ● |
task.isAlive | ● | ● |
task.cancel | ● | ● |
task.priority | ● | ● |
task.detach | ● | ● |
task.send | ● | ● |
Task Class
new Task(jsFile[, arg[, opt]])
jsFile
{String} Specify the file to be run by the new task.arg
{String} | {Object} New task argument. default: ''.opt
{Object} Create new task option.directory
{String} New task file prefix directory.
- Returns: {task} A new task object.
Create a new task, new task can get argument by ARGUMENT
variable. Once a task is created, it will be executed immediately.
Example
var task = new Task('./task.js', 'test_arg_string');
Task.me()
- Returns: {Integer} Current task ID.
Get current task ID.
Example
var id = Task.me();
Task.exit([retValue])
retValue
{Integer} Current return value. default: 0.
Current task exit. It is recommended to call this function in the outermost layer, and try not to call this function in event callbacks. This function will request the JS engine to exit the task, and the JS engine will exit the current task when it reaches the outermost layer.
Example
var id = Task.me();
console.log('Current task id', id);
Task.exit(); // Exit
Task.parent()
- Returns: {Integer} Current task parent ID.
Get current task parent ID. Returns 0 if current task is main task.
Example
var parent = Task.parent();
Task.create(jsFile[, arg[, opt]])
jsFile
{String} Specify the file to be run by the new task.arg
{String} | {Object} New task argument. default: ''.opt
{Object} Create new task option.directory
{String} New task file prefix directory.
- Returns: {Object} New task object.
Same as new Task(jsFile[, arg[, opt]])
.
Task.count()
- Returns: {Integer} Number of tasks.
Get the number of tasks in current process.
Example
var cnt = Task.count();
Task.list([exceptMe])
exceptMe
{Boolean} Except current task. default: false.- Returns: {Array} Task ID array.
Get All task ID list in current process.
Example
// Brocast message
var tasks = Task.list(true);
tasks.forEach(function(id) {
Task.send(id, { msg: 'brocast message!' });
});
Task.yield()
Relax CPU. If there is no task ready with same priority, the current task will continue execute.
Task.testCancel()
- Returns: {Boolean} Is there any task requesting to delete the current task?.
For safety reasons, tasks cannot be forcibly deleted, and JSRE task deletion uses a request and response mechanism.
Example
while (true) {
// Do something...
if (Task.testCancel()) {
// Do some delete prepare work...
Task.exit(); // same as return;
}
}
Tasks can use Task.testCancel()
to test if there are other tasks requesting to delete the current task. If there is, then prepare for exit, and then call return
at the outermost layer to complete the task deletion or call Task.exit()
function.
Task.priority(prio)
prio
{String} New priority.
prio
Can only be:
'low'
'normal'
'high'
'realtime'
Change current task scheduling priority.
JSRE runs on the SylixOS kernel. SylixOS is a preemptive real-time operating system, if high-priority tasks are ready, it can preempt low-priority tasks immediately, Such this scheduling policy can improve the system's real-time response capability. Applications should set critical tasks to high priority and normal tasks to low priority.
Only privileged mode applications can set 'realtime'
priority.
Example
Task.priority('low');
Task.send(id, msg)
id
{Integer} Target task ID.msg
{String} | {Object} Message to be send.- Returns: {Boolean} Returns true if the transmission succeeds, otherwise returns false.
In JSRE environment, each task has a message queue. This message queue can be used to receive messages sent by other tasks. These messages are bordered and do not merge multiple messages into one. Send and receive only one message at one time.
The message can be a string or an object. If it is an object, the system discards all methods and passes the object to other tasks.
Example
Task.send(otherTaskId, 'This is a test message!');
Task.send(id, buffer[, offset[, length[, addition]]])
id
{Integer} Target task ID.buffer
{Buffer} The data that needs to be send.offset
{Integer} Buffer offset. default:0.length
{Integer} Read length. default:buffer.length.addition
{Object} Buffer additional information. default: no information.- Returns: {Boolean} Returns true if the transmission succeeds, otherwise returns false.
Send binary data to the target task. If it is a large amount of binary data transfer, such as video, it is recommended to use shared memory.
Example
var buffer = fs.readFile('./test');
Task.send(otherTaskId, buffer);
The addition
object is an additional information, and the receiver can use this information to determine the purpose of the buffer.
Example
var buffer = fs.readFile('./test');
Task.send(otherTaskId, buffer, undefined, undefined, { foo: 'bar' });
Task.recv([from[, timeout]])
from
{Object} The system informs where the message came from, and theid
attribute is specified to indicate the id of the sending task. default:undefined.timeout
{Integer} Receive timeout in milliseconds. default: undefined means wait forever.- Returns: {String} | {Object} | {Buffer} Received message.
The task can receive a string or an object message. If the sender sends a string, it receives a string type message. If the sender sends an object type message, it receives the object type message. Task
defaults use asynchronous mode, users must first call Task.async(false)
to convert to synchronous mode before using this method.
Example
Task.async(false); // convert to synchronous mode
while (true) {
var from = {};
var msg = Task.recv(from);
if (msg) {
console.log('Receive a message from:', from);
switch(msg.id) {
case 0:
// Do something.
break;
case 1:
// Do something.
break;
default:
break;
}
}
}
Task.flush()
Clear the messages in the current task message queue. Unreceived messages will be discarded.
Example
// All message discarded.
Task.flush();
Task.isMain()
- Returns: {Boolean} Whether the current task is the main task of this process.
Get the current task whether it is the main task of this process.
Example
if (Task.isMain()) {
// This task is main task.
}
Task.getCurrentSize(id)
id
{Integer} Task ID.- Returns: {Integer} Total message size in target task.
JSRE allows user set the size of task message queue buffer. When all unreceived message total size reaches this setting, the new message will be discarded. Task.getCurrentSize(id)
gets the size of all message in the message queue of the specified task.
Example
var curMsgSize = Task.getCurrentSize(Task.me());
Task.getBufferSize(id)
id
{Integer} Task ID.- Returns: {Integer} Message queue buffer size of the specified task.
Get the specified task message queue size.
Example
var curQueueSize = Task.getBufferSize(Task.me());
Task.setBufferSize(id, size)
id
{Integer} Task ID.size
{Integer} Task message queue size.
Set the specified task message queue size.
Example
// Set message queue size to 16KBytes.
Task.setBufferSize(Task.me(), 16 * 1024);
Task.nextTick(callback[, ...args]);
callback
{Function} Callback function....args
{Any} Any number of any type callback parameters.
Task.nextTick()
is the same as Node.js process.nextTick()
. This function adds a callback to the system pending queue. This function will be called when the system allows interrupts.
JSRE supports the interrupt mechanism and supports setting the interrupt response mode to control when the interrupt is responded, the setting function is interrupt.level(option)
valid options include:
Definition | Description |
---|---|
interrupt.ANYTIME | Responds interrupt at any time. Only for Privileged Mode applications. |
interrupt.ONPEND | Responds interrupt when program is blocked, such as blocking at network receive and send function call. |
interrupt.ONRPEND | Responds interrupt only when system has a read/receive blocking. This is default setting. |
interrupt.ONPOLL | Responds interrupt only when sys.sleep() or iosched.poll() / forever() function blocked. |
When interrupt respond is enabled, JSRE first checks if the pending queue has any pending callback, and if exist, the callback will executes immediately.
Example
function callback() {
console.log('callback run!');
}
Task.nextTick(callback);
Task.cleanup(callback[, ...args]);
callback
{Function} Callback function....args
{Any} Any number of any type callback parameters.
Add a callback function, which will be executed when the task exits normally. The execution order is the opposite direction of the installation order.
Task.fd()
- Returns: {Integer} Task message event file descriptor.
Get current task message event file descriptor. Only for iosched
readable event detection in current tasks.
Example
var ioevent = iosched.event(iosched.READ, Task.fd(),
() => {
var from = {};
var msg = Task.recv(from);
if (msg) {
console.log('receive:', msg, 'from:', from);
}
return true;
});
iosched.add(ioevent);
while (true) {
iosched.poll();
}
Task.async([enable])
enable
{Boolean} Whether to enable asynchronous mode. default: true.
Receiving task messages using asynchronous mode. After enabling this mode, the Task.recv()
method will be deleted.
Example
Task.async(); // asynchronous mode
Task.async(false); // synchronous mode
Task Local Storage
Each task has its own local storage manager, and multiple tasks cannot access each other.
Task.tls
- {Map}
Task.tls
is a Map
type object, which is valid for the current entire task. The current task can be stored, read, traversed, and deleted anywhere. It should be noted that: Map
type objects refer to the stored objects. If you do not manually delete them, a memory leak will occur.
Example
Task.tls.set(key, value);
Task.tls.get(key);
Task.tls.delete(key);
Task.wtls
- {WeakMap}
Task.wtls
is a WeakMap
type object, which is valid for the current entire task. The current task can be stored, read and deleted anywhere. WeakMap
makes weak references to key. It only supports get()
, set()
, has()
and delete()
operations. It does not support traversal iteration.
Example
Task.wtls.set(key, value);
Task.wtls.get(key);
Task.wtls.delete(key);
Task.features
- {Map}
Task.features
is a Map
type object, This map saves some working feature parameters of the current JS context, which can be adjusted to control the operation of JSRE.
Task Object
task.id()
- Returns: {Integer} Task ID.
Get the task ID created before, If the return 0, the task ID is invalid.
Example
var task = new Task('./task.js');
console.log('new task ID:', task.id());
task.isAlive()
- Returns: {Boolean} Is this task alive.
Get whether the specified task is alive. If this task is detached, you cannot get the alive status of this task.
Example
var task = new Task('./task.js');
console.log('task id:', task.id(), 'alive:', task.isAlive());
task.cancel([callback])
callback
{Function} Execute this callback after the task exits. default: no callback.id
{Integer} Task ID which has been canceled.retVal
{Integer} Task returned value.
- Returns: {Integer}
0
: Request success,1
: The task no longer exists.
This function requests to delete the target task. The target task can detect the delete request by using Task.testCancel()
.
Example
var task = new Task('./task.js');
task.cancel((id, retVal) => {
console.log('Task canceled, return value:', retVal);
});
task.priority(prio)
prio
{String} New priority.
prio
Can only be:
'low'
'normal'
'high'
'realtime'
Set target task priority. Only privileged mode applications can set 'realtime'
priority. For details, please refer to Task.priority()
.
Example
var task = new Task('./task.js');
task.priority('low');
task.detach()
Detach child task, the system reclaims resources after the child task exits, and cannot obtain the task return value.
Cannot operate this task
object after task.detach()
.
task.send(msg)
msg
{String} | {Object} Message to be send.- Returns: {Boolean} Returns true if the transmission succeeds, otherwise returns false.
Send a message to task. For details, please refer to Task.send()
.
Example
var task = new Task('./task.js');
task.send('test message');
task.send(buffer[, offset[, length[, addition]]])
buffer
{Buffer} The data that needs to be send.offset
{Integer} Buffer offset. default:0.length
{Integer} Read length. default:buffer.length.addition
{Object} Buffer additional information. default: no information.- Returns: {Boolean} Returns true if the transmission succeeds, otherwise returns false.
Send binary data to task. If it is a large amount of binary data transfer, such as video, it is recommended to use shared memory.
Example
var buffer = fs.readFile('./test');
task.send(buffer);
The addition
object is an additional information, and the receiver can use this information to determine the purpose of the buffer.
Example
var buffer = fs.readFile('./test');
task.send(buffer, undefined, undefined, { foo: 'bar' });
Task Events
The Task
class inherits from the EventEmitter
class. The following events are thrown in some specific situations.
message
When current task receives a message system will emit this event.
Example
Task.on('message', function(msg, from) {
// ...
});
Example
Task.on('message', function(msg, from) {
if (Buffer.isBuffer(msg)) {
console.log(msg.addition);
}
});
uncaughtException
When the deep callback function throws an exception, the current task can get the exception information by this event.
If this event is not subscribed, the system will print the exception information and the call stack, then use 1 as the exit code (exit(1)
) and exit.
Example
Task.on('uncaughtException', (error) => {
console.log('callback except!', error);
console.trace(error.stack);
});
unhandledRejection
When the javascript engine encounters a promise reject that has unhandled, this event will be generated. The unhandledRejection
event will not cause the process to exit now, but JSRE may terminate the process in the future, when the user does not caught this event.
Example
Task.on('unhandledRejection', (reason, promise) => {
console.log('Unhandled Promise rejection:', reason);
if (reason instanceof Error) {
console.trace(reason.stack);
}
});